跳到主要内容

Zookeeper JavaAPI 事件监听

watcher 是什么?

zookeeper 提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者。

zookeeper 采用了 Watcher 机制实现数据的发布/订阅功能(发布订阅模式)。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在 Watcher 注册后轮询阻塞,从而减轻了客户端压力。

watcher 机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式。

watcher 的架构

Zookeeper 分为如下两类:

  • Zookeeper 服务端
  • Zookeeper 客户端

客户端首先将 Watcher 注册到服务端,同时将 Watcher 对象保存到客户端的 Watch 管理器中。当 ZooKeeper 服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的 Watch 管理器会触发相关 Watcher 来回调相应处理逻辑,从而完成整体的数据发布/订阅流程。

watcher 的特性

特性说明
一次性watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册
客户端顺序回调watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该多,以免影响别的watcher执行
轻量级WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容
时效性watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收通知

watcher 的接口设计

Watcher 是一个接口,任何实现了 Watcher 接口的类就是一个新的 Watcher,Watcher 内部包含了两个枚举类:KeeperState、EventType。(这个 WatcherType 是观察者类型,Children、Data、Any)

通知状态:KeeperState

KeeperState 是客户端与服务端连接状态发生变化时对应的通知类型。其枚举属性如下:

枚举属性说明
SyncConnected客户端与服务器正常连接时
Disconnected客户端与服务器断开连接时
Expired会话session失效时
AuthFailed身份认证失败时

事件类型:EventType

枚举属性说明
None
NodeCreated Watcher监听的数据节点被创建时
NodeDeleted Watcher监听的数据节点被删除时
NodeDataChangedWatcher监听的数据节点内容发生变更时(无论内容数据是否变化)
NodeChildrenChangedWatcher监听的数据节点的子节点列表发生变更时

EventType 是数据节点(znode)发生变化时对应的通知类型。EventType 变化时 KeeperState 永远处于 SyncConnected 通知状态下;当 KeeperState 发生变化时,EventType 永远为 None。

注:客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用 get 等方法重新获取;

注册 watcher 的方法

注册方式CreatedChildrenChangedChangedDeleted
zk.exists("/node-x",watcher)可监控可监控可监控
zk.getData("/node-x",watcher)可监控可监控
zk.getChildren("/node-x",watcher)可监控可监控

使用检查节点是否存在的方法为节点注册 watcher,测试 watcher 的一次性

exists 创建监听

exists 是一次性的,所以如果还想继续监听这个节点,需要它被触发后再次创建一个监听

这个 exists 方法有两个主要的重载

// 使用连接对象的监听器(就是下面那个连接 zk 时填入的监听器)
exists(String path, boolean b)

// 使用自定义的监听器
exists(String path, Watcher w)

重载一:使用连接对象的监听器

class ZookeeperConnectionTest {

static ZooKeeper zooKeeper = null;

// 先连接上
@BeforeAll
static void connect() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, (event) -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功");
countDownLatch.countDown();
}
System.out.println("path==" + event.getPath());
System.out.println("eventType==" + event.getType());
});
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
// 打印会话编号
System.out.println(zooKeeper.getSessionId());
}

@Test
void watcherExists() throws KeeperException, InterruptedException {
// 参数1:节点的路径
// 参数2:使用连接对象中的 watcher
zooKeeper.exists("/watcher1", true);
TimeUnit.SECONDS.sleep(60);
}

// 先连接上
@AfterAll
static void exit() throws Exception {
zooKeeper.close(); // 释放连接
}
}

在终端创建这个节点后

就可以在控制台中打印出这个节点了(注意,得创建的是 /watcher1

重载二:使用自定义的监听器

@Test
void watcherExists2() throws KeeperException, InterruptedException {
// 参数1:节点的路径
// 参数2:自定义的监听器
zooKeeper.exists("/watcher1", event -> {
System.out.println("自定义watcher");
System.out.println("path:" + event.getPath());
System.out.println("evenType:" + event.getType());
System.out.println("evenState:" + event.getState());
});
TimeUnit.SECONDS.sleep(60);
}

在控制台修改这个节点

打印在控制台的为修改事件

重复监听的方法

上面说过了默认情况下 exists 是一次性的,所以可以在它每次被执行后再次监听

注意如果使用了 this 指针就不能使用 Lambda 了,具体原因看 Lambda的笔记,总之就是那个 this 指针会指向调用它的外部类,这里得用匿名函数

@Test
void watcherExists2() throws KeeperException, InterruptedException {
// 参数1:节点的路径
// 参数2:自定义的监听器
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher");
System.out.println("path:" + event.getPath());
System.out.println("evenType:" + event.getType());
System.out.println("evenState:" + event.getState());
try {
zooKeeper.exists("/watcher1",this);
} catch (Exception e) {
e.printStackTrace();
}
}
});
TimeUnit.SECONDS.sleep(60);
}

getData 监听数据变化

这个方法主要有这两个重载方法

// 使用连接对象的监听器
getData(String path, boolean b,Stat stat)

// 自定义监听器
getData(String path, Watcher w,Stat stat)

其它参数都和上面一样,这里的这个 Stat 是 ZNode 的元数据(上面的 exists 返回值就是 Stat)

修改数据后打印的如下所示

@Test
void watcherGetData1() throws KeeperException, InterruptedException {
// 参数1:节点的路径
// 参数2:使用连接对象的watcher
zooKeeper.getData("/watcher1", true, null);
TimeUnit.SECONDS.sleep(100);
}

配置中心案例

工作中有这样的一个场景: 数据库用户名和密码信息放在一个配置文件中,应用读取该配置文件,配置文件信息放入缓存。若数据库的用户名和密码改变时候,还需要重新加载缓存,比较麻烦,通过 ZooKeeper 可以轻松完成,当数据库发生变化时自动完成缓存同步。

设计思路:

  1. 连接 zookeeper 服务器
  2. 读取 zookeeper 中的配置信息,注册 watcher 监听器,存入本地变量
  3. 当 zookeeper 中的配置信息发生变化时,通过 watcher 的回调方法捕获数据变化事件
  4. 重新获取配置信息

现在 zookeeper 初始化数据

在 zookeeper 中插入三条记录:

create /config "config"
create /config/url "127.0.0.1:3306"
create /config/username "root"
create /config/password "070313"

具体的编码

@Data
@ToString
public class MyConfigCenter implements Watcher {
// zk连接对象
ZooKeeper zooKeeper;
// 计数器对象
private static CountDownLatch countDownLatch = new CountDownLatch(1);
// zk的连接串
private String ip = "127.0.0.1:2181";
// 用于本地化配置存储信息
private String url;
private String username;
private String password;

// 构造这个类时执行初始化函数
MyConfigCenter() {
initValue();
}

/**
* 连接 zookeeper 服务器,读取配置信息
*/
private void initValue() {
//创建连对象
try {
// 创建链接
if (zooKeeper == null) {
zooKeeper = new ZooKeeper(ip, 5000, this);
}

// 阻塞线程,等待连接成功
countDownLatch.await();
// 读取配置信息
this.url = new String(zooKeeper.getData("/config/url", true, null));
this.username = new String(zooKeeper.getData("/config/username", true, null));
this.password = new String(zooKeeper.getData("/config/password", true, null));
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void process(WatchedEvent event) {
try {
// 时间类型
if (event.getType() == Watcher.Event.EventType.None) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功");
countDownLatch.countDown();
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
System.out.println("断开连接");

} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
System.out.println("会话超时");
zooKeeper = new ZooKeeper(ip, 5000, this); // 重连

} else if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
System.out.println("验证失败");
}
}

// 不用管上面那个,那个 None 事件一般是连接状态相关的,主要就是这里
else if (event.getType() == Event.EventType.NodeDataChanged) {
// 当配置信息发生变化,就重新去加载配置信息
initValue();
}
} catch (Exception e) {
e.printStackTrace();
}
}


public static void main(String[] args) throws InterruptedException {
MyConfigCenter myConfigCenter = new MyConfigCenter();
// 模拟轮询检查数据是否更新
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(5);
System.out.println(myConfigCenter);
System.out.println("==================================================");
}
}
}

然后在它运行的时候修改 ZK 里面的数据测试

然后看控制台,可以看到它已经改成最新的数据了

分布式唯一 Id 案例

在过去的单库单表型系统中,通常可以使用数据库字段自带的 auto_increment 属性来自动为每条记录生成一个唯一的ID。但是分库分表后,就无法在依靠数据库的 auto_increment 属性来唯一标识一条记录了。此时我们就可以用 zookeeper 在分布式环境下生成全局唯一ID。

它的原理就是创建顺序节点时后面会跟随一个唯一编号

public class GloballyUniqueId implements Watcher {

// zk连接对象
ZooKeeper zooKeeper;
// 计数器对象
private CountDownLatch countDownLatch = new CountDownLatch(1);
// zk的连接串
private String ip = "127.0.0.1:2181";
// 用户生成序号的节点
private String defaultPath = "/uniqueId";

/**
* 构造方法
*/
public GloballyUniqueId() {
try {
// 打开连接对象
zooKeeper = new ZooKeeper(ip, 5000, this);
// 阻塞线程
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void process(WatchedEvent event) {
// 判断连接状态...
}

/**
* 生成ID的方法
*/
public String getUniqueId() {
String path = "";
try {
/*
第一个参数:节点的路径
第二个参数:节点的数据
第三个参数:权限列表 ZooDefs.Ids.OPEN_ACL_UNSAFE:world:anyone:cdrwa /
第四个参数:节点的类子持久化节点
*/
path = zooKeeper.create(defaultPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception e) {
e.printStackTrace();
}
// 裁切掉 /uniqueId 只保留后面的编号
return path.substring(9);
}

public static void main(String[] args) {
GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
for (int i = 0; i < 5; i++) {
String uniqueId = globallyUniqueId.getUniqueId();
System.out.println(uniqueId);
}
}
}